package cn.iocoder.yudao.module.system.service.chatconversation;

import cn.iocoder.yudao.framework.common.exception.ErrorCode;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApi;
import cn.iocoder.yudao.module.system.dal.dataobject.user.UserAssetsDO;
import cn.iocoder.yudao.module.system.dal.mysql.user.UserAssetsMapper;
import cn.iocoder.yudao.module.system.service.user.UserAssetsService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.*;

import lombok.val;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.iocoder.yudao.module.system.controller.admin.conversation.vo.ChatConversationPageReqVO;
import cn.iocoder.yudao.module.system.controller.admin.conversation.vo.ChatConversationRespVO;
import cn.iocoder.yudao.module.system.controller.admin.conversation.vo.ChatConversationSaveReqVO;
import cn.iocoder.yudao.module.system.dal.dataobject.chatconversation.ChatConversationDO;
import cn.iocoder.yudao.module.system.dal.mysql.chatconversation.ChatConversationMapper;
import cn.iocoder.yudao.module.system.util.baidu.BaiduAiUtils;
// 移除 AppBuilderServerException 的导入
// 移除 AppBuilderClientIterator 和 AppBuilderClientResult 的导入
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;

import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import javax.annotation.Resource;

import java.io.IOException;
import java.util.List;
import java.util.ArrayList;

import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils.getLoginUserId;
import static cn.iocoder.yudao.module.system.util.baidu.BaiduAiUtils.APP_BUILDER_TOKEN;

/**
 * AI 聊天对话 Service 实现类
 *
 * @author 芋道源码
 */
@Service
@Validated
public class ChatConversationServiceImpl implements ChatConversationService {

    private static final Logger log = LoggerFactory.getLogger(ChatConversationServiceImpl.class);

    private static final int MAX_RETRIES = 3;
    private static final long INITIAL_BACKOFF = 1000L; // 1秒

    @Resource
    private ChatConversationMapper chatConversationMapper;

    @Resource
    UserAssetsMapper userAssetsMapper;

    @Resource
    private UserAssetsService userAssetsService;

    private final ExecutorService chatExecutor = new ThreadPoolExecutor(
            5, // 核心线程数
            10, // 最大线程数
            60L, // 空闲线程存活时间
            TimeUnit.SECONDS, // 时间单位
            new LinkedBlockingQueue<>(100), // 工作队列
            new ThreadFactoryBuilder().setNameFormat("chat-pool-%d").build(), // 线程工厂
            new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略


    @Override
    public Flux<CommonResult<ChatConversationRespVO>> createChatConversation(ChatConversationSaveReqVO createReqVO) throws IOException {
        //每次对话扣减可对话次数
        if(createReqVO.getUserId()==null){
            Long loginUserId = getLoginUserId();
            if (loginUserId != null) {
                createReqVO.setUserId(loginUserId);
            }else {
                throw exception(new ErrorCode(404,"用户未登录"));
            }
        }

        // 检查用户问答会员剩余次数
        userAssetsService.validateAskMembership(createReqVO.getUserId());

        // 插入初始对话记录
        ChatConversationDO chatConversation = BeanUtils.toBean(createReqVO, ChatConversationDO.class);
        chatConversation.setRoleId(1L);
        chatConversationMapper.insert(chatConversation);

        final Long conversationId = chatConversation.getId();
        final Long userId = createReqVO.getUserId();

        // 创建Flux流式返回
        return Flux.create(sink -> {
            // 使用线程池处理AI对话
            CompletableFuture.runAsync(() -> {
                try {
                    // 使用ThreadLocal确保线程安全
                    ThreadLocal<String> tokenHolder = ThreadLocal.withInitial(() -> APP_BUILDER_TOKEN);
                    System.setProperty("APPBUILDER_TOKEN", tokenHolder.get());

                    String answer = retryBaiduAiChat(createReqVO.getTitle(),false);
                    List<String> responses = new ArrayList<>();

                    if (answer != null) {
                        try {
                            // 发送实时响应
                            ChatConversationRespVO respVO = new ChatConversationRespVO();
                            respVO.setId(conversationId);
                            respVO.setContent(answer);
                            sink.next(CommonResult.success(respVO));

                            // 仅保存非空回复
                            if (!answer.trim().isEmpty()) {
                                responses.add(answer);
                            }
                        } catch (Exception e) {
                            log.error("Error processing chat response", e);
                        }
                    }

                    // 保存系统回复
                    try {
                        if (!responses.isEmpty()) {
                            ChatConversationDO chatConversationSystem = new ChatConversationDO();
                            chatConversationSystem.setUserId(userId);
                            // 使用String.join更高效地连接字符串
                            chatConversationSystem.setSystemMessage(String.join("", responses));
                            chatConversationSystem.setRoleId(2L);
                            chatConversationMapper.insert(chatConversationSystem);

                            // 用户资产已在validateAskMembership方法中更新
                        }
                    } catch (Exception e) {
                        log.error("Error saving system response", e);
                    }

                    sink.complete();
                    tokenHolder.remove(); // 清理ThreadLocal
                } catch (Exception e) {
                    log.error("Error in chat conversation", e);
                    sink.error(e);
                }
            }, chatExecutor);
        }, FluxSink.OverflowStrategy.BUFFER); // 使用BUFFER策略处理背压
    }

    @Override
    public void updateChatConversation(ChatConversationSaveReqVO updateReqVO) {
        // 校验存在
        validateChatConversationExists(updateReqVO.getId());
        // 更新
        ChatConversationDO updateObj = BeanUtils.toBean(updateReqVO, ChatConversationDO.class);
        chatConversationMapper.updateById(updateObj);
    }

    @Override
    public void deleteChatConversation(Long id) {
        // 校验存在
        validateChatConversationExists(id);
        // 删除
        chatConversationMapper.deleteById(id);
    }

    /**
     * 使用指数退避策略重试BaiduAI聊天接口
     *
     * @param title 对话标题
     * @return String 如果所有重试都失败则返回null
     */
    private String retryBaiduAiChat(String title,Boolean stream) {
        int retries = 0;
        long backoff = INITIAL_BACKOFF;

        while (retries < MAX_RETRIES) {
            try {
                return BaiduAiUtils.chat(BaiduAiUtils.Version.NORMAL.getAppId(), title, stream);
            } catch (Exception e) {
                retries++;
                if (retries == MAX_RETRIES) {
                    log.error("Failed to call BaiduAI chat after {} retries", MAX_RETRIES, e);
                    return null;
                }
                log.warn("Retry {} failed, will retry in {} ms", retries, backoff, e);
                try {
                    Thread.sleep(backoff);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    return null;
                }
                // 指数退避，每次失败后等待时间翻倍
                backoff *= 2;
            }
        }
        return null;
    }

    private void validateChatConversationExists(Long id) {
        if (chatConversationMapper.selectById(id) == null) {
            throw exception(new ErrorCode(404,"AI 聊天对话不存在"));
        }
    }

    @Override
    public ChatConversationDO getChatConversation(Long id) {
        return chatConversationMapper.selectById(id);
    }

    @Override
    public PageResult<ChatConversationDO> getChatConversationPage(ChatConversationPageReqVO pageReqVO) {
        return chatConversationMapper.selectPage(pageReqVO);
    }

}